-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: extract & insert sidecar batches in replay
's action iterator
#679
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #679 +/- ##
==========================================
+ Coverage 84.36% 84.42% +0.06%
==========================================
Files 75 75
Lines 17654 18202 +548
Branches 17654 18202 +548
==========================================
+ Hits 14893 15367 +474
- Misses 2052 2055 +3
- Partials 709 780 +71 ☔ View full report in Codecov by Sentry. |
b2c5001
to
00af1f9
Compare
let sidecar_files: Result<Vec<_>, _> = visitor | ||
.sidecars | ||
.iter() | ||
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if sidecar_to_file_meta could be a closure. We only use this once.
let sidecar_to_filemeta = |sidecar| {
let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
Ok(FileMeta {
location,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
})
}
And then map sidecar
visitor
.sidecars
.iter()
.map(sidecar_to_filemeta)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give it a shot and see how it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's a good idea to leave it as a separate function for unit testing purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say either keep the separate function (if needed for testing) or embed the logic directly in the map
call? What purpose does a separately named closure serve?
(aside: not sure if cargo fmt will like my indentation choice above -- depends on whether the (
or {
is more important)
} | ||
|
||
fn process_single_checkpoint_batch( | ||
parquet_handler: Arc<dyn ParquetHandler>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc, we want to avoid passing handlers around. Only reference to the engine. I think it's because we want to make it clear that the handler is tied to the engine and not to encourage holding an Arc ref to the handler.
cc @zachschuermann to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall running into lifetime issues when passing the entire engine. I believe we would have to explicitly tie the iterator's lifetime to that of the engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also pass an Arc.
Basically the iterator needs to hold a reference for the entire duration it's lazily evaluating. So you want to give it a reference it can hold for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this change extends all the way to changing the scan_data
function signature to explicitly tie the engines lifetime to the iterator.
pub fn scan_data<'a>(
&self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>> + 'a> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, so the basic issue is that we have delayed reading of parquet files, so at some point we want an item off the iterator, and to produce it, we need to read some parquet, so we need a handler. Previously we could do all the read calls up front and then just map off that iterator, so we didn't need an engine ref plumbed through.
I think if this is all internal, i.e., we don't want to expose any of these function signatures to engines (especially in the FFI), then cloning the Arc
s is fine (it's very cheap. as a suggestion we usually put // cheap arc clone
at those clone sites to make it clear).
If we do want to ever expose this, we'll need to think more, but afaict, we don't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the look @nicklan, just to confirm we will go ahead and clone the parquet handler
kernel/src/actions/visitors.rs
Outdated
// We read checkpoint batches with the sidecar action. This results in empty paths | ||
// if a row is not a sidecar action. We do not want to create a sidecar action for | ||
// these rows. | ||
if path.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong. Are we mishandling column nullability somewhere, that can cause empty strings to be returned instead of NULL?
It seems like this sort of issue has shown up a few times recently -- do we have a lurking bug somewhere? or is null handling just error-prone in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered Sidecar actions with empty strings for path
when running test_create_checkpoint_stream_reads_parquet_checkpoint_batch
at first. I believe it is because of the way I am creating the dummy checkpoint batch (beginning with a json string, specifically add_batch_simple()
). The fix is not obvious to me though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more context, when I create a dummy engine data batch from a json string (without sidecar actions) with the SyncEngine
's json handler:
pub(crate) fn parse_json( |
and the sidecar action is included in the output_schema, I find the above error case. When the sidecar action is not included in the output_schema to the test util, the sidecar column handles nullability correctly. Does this seem like an issue with the
SyncEngine
's json handler's core functionality @scovich? This isn't an area I'm deeply familiar with but I'll look into it, my investigation might take a bit longer.
Below is the json string which is converted to engine data with string_array_to_engine_data
, which is finally passed to the sync engine.
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has disappeared in the latest version, did you figure out a fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a fix for the test case which created batches that had this weird empty-string Sidecar path field.
However, I am concerned that there may be something wrong with the SyncEngine
's json handler's functionality as it allowed me to create this malformed batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, how was the batch malformed (= physically invalid)?
It seemed like the test was simply passing an empty string, which is schema-compatible. Arrow has no way to know that Delta puts additional constraints on the field value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, the test case this issue originated from was: test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars
I've left a comment below with more context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#692 solved this, right?
kernel/src/log_segment.rs
Outdated
// If sidecars files exist, read the sidecar files and return the iterator of sidecar batches | ||
// to replace the checkpoint batch in the top level iterator | ||
Ok(Right(parquet_handler.read_parquet_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subtle -- replacing the top-level means all non-file actions will be lost. This is only safe correct if all checkpoint scans are exclusively requesting adds or exclusively requesting non-file actions. I'm pretty sure it will break our inspect-table
example that visits all actions during log replay.
We would either need to keep returning the top-level actions unconditionally (safer) or inspect the read schema to see whether we need non-file actions. Simper feels (a lot) safer to me, and seems unlikely to cause any measurable performance hit -- each checkpoint part has thousands of actions, vs. dozens in the top level. manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the above, we might be able to eliminate multiple left/right use sites, by careful management of type signatures. Conceptually, we would always do a map over the top level checkpoint iterator, producing the following output:
let sidecar_content = Self::process_sidecars(top_level_batch, ...); // returns Option<impl Iterator>
std::iter::once(top_level_batch).chain(sidecar_content.into_iter().flatten())
We could pass a flag into process_sidecars
that short circuits it to None
(pretending no sidecars were found), or we could just cheat and do a spurious map call, just to get the correct signature:
std::iter::once(top_level_batch).chain(None.into_iter().flatten())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh... compiler didn't like my toy example that did the spurious map
call...
note: no two closures, even if identical, have the same type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the catch @scovich I was operating under the assumption that
all checkpoint scans are exclusively requesting adds or exclusively requesting non-file actions
would be always true. But keeping the top-level actions unconditionally feels like a much safer approach. I'll move forward with this and have noted this decision in the design doc.
replay
's action iterator when necessary
replay
's action iterator when necessaryreplay
's action iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shape of the PR looks good. A few questions and nits (and waiting for it to exit "draft" status)
kernel/src/actions/visitors.rs
Outdated
// We read checkpoint batches with the sidecar action. This results in empty paths | ||
// if a row is not a sidecar action. We do not want to create a sidecar action for | ||
// these rows. | ||
if path.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has disappeared in the latest version, did you figure out a fix?
kernel/src/log_segment.rs
Outdated
skip_sidecar_search, | ||
)?; | ||
|
||
Ok(std::iter::once(Ok((checkpoint_batch, false))).chain( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you really wanted to get fancy, could do:
let top_iterable = need_nonfile_actions.then(|| Ok((checkpoint_batch, false)));
Ok(sidecar_content...chain(top_iterable))
... where need_nonfile_actions
comes from a schema test, similar but opposite to the skip_sidecar_search
. But it's probably not worth the complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I'd like to move forward with unconditionally including the checkpoint batch for simplicities sake
let sidecar_files: Result<Vec<_>, _> = visitor | ||
.sidecars | ||
.iter() | ||
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say either keep the separate function (if needed for testing) or embed the logic directly in the map
call? What purpose does a separately named closure serve?
(aside: not sure if cargo fmt will like my indentation choice above -- depends on whether the (
or {
is more important)
541655e
to
4631bae
Compare
kernel/src/log_segment/tests.rs
Outdated
|
||
mock_table | ||
.parquet_checkpoint( | ||
add_batch_simple(get_log_add_schema().clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich this is the batch creation I mentioned in the other comment.
Previously, when passing a schema that included the sidecar action to add_batch_simple
, I found that the SidecarVisitor would find sidecar actions which had the empty path field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by #692, right?
replay
's action iteratorreplay
's action iterator
(nit: changed |
7ec299b
to
ff67fcf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
What changes are proposed in this pull request?
Summary
This PR introduces foundational changes required for V2 checkpoint read support. The high-level changes required for v2 checkpoint support are:
Item 1. Allow log segments to be built with V2 checkpoint files
Item 2. Allow log segment
replay
functionality to retrieve actions from sidecar files if need be.This PR specifically adds support for Item 2.
This PR does not introduce full v2Checkpoints reader/writer support as we are missing support for Item 1, meaning log segments can never have V2 checkpoint files in the first place. That functionality will be completed in PR #685 which is stacked on top of this PR. However, the changes to log
replay
done here are compatible with tables using V1 checkpoints, allowing us to safely merge the changes here.Changes
For each batch of
EngineData
from a checkpoint file:SidecarVisitor
to scan each batch for sidecar file paths embedded in sidecar actions.- Note: the original checkpoint batch is still included in the iterator
Notes:
checkpoint_read_schema
does not have file actions, we do not need to scan the batch with theSidecarVisitor
and can leave the batch as-is in the top-level iterator.SidecarVisitor
and can leave the batch as-is in the top-level iterator.resolves #670
How was this change tested?
Although log segments can not yet have V2 checkpoints, we can easily mock batches that include sidecar actions that we can encounter in V2 checkpoints.
test_sidecar_to_filemeta_valid_paths
Unit tests for process_single_checkpoint_batch:
test_checkpoint_batch_with_no_sidecars_returns_none
test_checkpoint_batch_with_sidecars_returns_sidecar_batches
test_checkpoint_batch_with_sidecar_files_that_do_not_exist
Unit tests for create_checkpoint_stream:
test_create_checkpoint_stream_errors_when_schema_has_remove_but_no_sidecar_action
test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_action
test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions
test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part
test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars
test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars
test_create_checkpoint_stream_reads_checkpoint_batch_with_sidecar